You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/03/21 01:12:39 UTC

[impala] 04/06: IMPALA-9357: Fix race condition in alter_database event

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch branch-3.4.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f41710d1bf2d33e89b318cb2c83781be0bc83f42
Author: Vihang Karajgaonkar <vi...@cloudera.com>
AuthorDate: Thu Feb 6 13:43:17 2020 -0800

    IMPALA-9357: Fix race condition in alter_database event
    
    The race condition is exposed intermittently, on certain builds which
    causes test_event_processing::test_self_events test to fail. This
    happens because we are checking for self-event identifiers in the Db
    object without taking a lock. When a DDL like 'comment on
    database is 'test'' is executed, it is possible that the event
    processor thread is triggered as soon as the ALTER_DATABASE event is
    generated. This may cause event processor fail the self-event detection
    since the self-event identifiers are not yet added to the Db object.
    
    The fix adds a Db lock similar to Table lock. Alter db operations
    in CatalogOpExecutor now take db locks instead of metastoreDdlLock_
    which makes it consistent with table locking protocol.
    
    Testing:
    1. Ran existing tests for events processor.
    2. This test was failing on centos6 frequently (failed in 1/3 times).
    After the fix I ran the test in a loop for 24 hrs (197 iterations) and
    the test didn't fail.
    3. Ran core tests with CDP and CDH builds.
    
    Change-Id: I472fd8a55740769ee5cdb84e48422a4ab39a8d1e
    Reviewed-on: http://gerrit.cloudera.org:8080/15260
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    (cherry picked from commit 1d27b91a36f687f958b02738ba7899652b2cfec7)
---
 .../impala/catalog/CatalogServiceCatalog.java      | 138 +++++++++++--------
 fe/src/main/java/org/apache/impala/catalog/Db.java |  21 +--
 .../org/apache/impala/catalog/HdfsPartition.java   |  20 +--
 .../main/java/org/apache/impala/catalog/Table.java |  14 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 148 ++++++++++++++-------
 tests/custom_cluster/test_concurrent_ddls.py       |  34 +++--
 tests/custom_cluster/test_event_processing.py      |  11 +-
 7 files changed, 241 insertions(+), 145 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index b66d5ee..fb7ef4d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -192,9 +191,9 @@ public class CatalogServiceCatalog extends Catalog {
   private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
   // Timeout for acquiring a table lock
   // TODO: Make this configurable
-  private static final long TBL_LOCK_TIMEOUT_MS = 7200000;
+  private static final long LOCK_RETRY_TIMEOUT_MS = 7200000;
   // Time to sleep before retrying to acquire a table lock
-  private static final int TBL_LOCK_RETRY_MS = 10;
+  private static final int LOCK_RETRY_DELAY_MS = 10;
 
   private final TUniqueId catalogServiceId_;
 
@@ -402,8 +401,9 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Tries to acquire versionLock_ and the lock of 'tbl' in that order. Returns true if it
-   * successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held
-   * when the function returns. Returns false otherwise and no lock is held in this case.
+   * successfully acquires both within LOCK_RETRY_TIMEOUT_MS millisecs; both locks are
+   * held when the function returns. Returns false otherwise and no lock is held in
+   * this case.
    */
   public boolean tryLockTable(Table tbl) {
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
@@ -423,12 +423,45 @@ public class CatalogServiceCatalog extends Catalog {
         versionLock_.writeLock().unlock();
         try {
           // Sleep to avoid spinning and allow other operations to make progress.
-          Thread.sleep(TBL_LOCK_RETRY_MS);
+          Thread.sleep(LOCK_RETRY_DELAY_MS);
         } catch (InterruptedException e) {
           // ignore
         }
         end = System.currentTimeMillis();
-      } while (end - begin < TBL_LOCK_TIMEOUT_MS);
+      } while (end - begin < LOCK_RETRY_TIMEOUT_MS);
+      return false;
+    }
+  }
+
+  /**
+   * Similar to tryLock on a table, but works on a database object instead of Table.
+   * TODO: Refactor the code so that both table and db can be "lockable" using a single
+   * method.
+   */
+  public boolean tryLockDb(Db db) {
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
+        "Attempting to lock database " + db.getName())) {
+      long begin = System.currentTimeMillis();
+      long end;
+      do {
+        versionLock_.writeLock().lock();
+        if (db.getLock().tryLock()) {
+          if (LOG.isTraceEnabled()) {
+            end = System.currentTimeMillis();
+            LOG.trace(String.format("Lock for db %s was acquired in %d msec",
+                db.getName(), end - begin));
+          }
+          return true;
+        }
+        versionLock_.writeLock().unlock();
+        try {
+          // Sleep to avoid spinning and allow other operations to make progress.
+          Thread.sleep(LOCK_RETRY_DELAY_MS);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+        end = System.currentTimeMillis();
+      } while (end - begin < LOCK_RETRY_TIMEOUT_MS);
       return false;
     }
   }
@@ -781,42 +814,6 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Gets the list of versions for in-flight events for the given table. Applicable
-   * only when external event processing is enabled.
-   * @param dbName database name
-   * @param tblName table name
-   * @return List of previous version numbers for in-flight events on this table.
-   * If table is not laoded returns a empty list. If event processing is disabled,
-   * returns a empty list
-   */
-  public List<Long> getInFlightVersionsForEvents(String dbName, String tblName)
-      throws DatabaseNotFoundException, TableNotFoundException {
-    Preconditions.checkState(isEventProcessingActive(),
-        "Event processing should be enabled before calling this method");
-    List<Long> result = Collections.EMPTY_LIST;
-    versionLock_.readLock().lock();
-    try {
-      Db db = getDb(dbName);
-      if (db == null) {
-        throw new DatabaseNotFoundException(
-            String.format("Database %s not found", dbName));
-      }
-      if (tblName == null) {
-        return db.getVersionsForInflightEvents();
-      }
-      Table tbl = getTable(dbName, tblName);
-      if (tbl == null) {
-        throw new TableNotFoundException(
-            String.format("Table %s not found", new TableName(dbName, tblName)));
-      }
-      if (tbl instanceof IncompleteTable) return result;
-      return tbl.getVersionsForInflightEvents();
-    } finally {
-      versionLock_.readLock().unlock();
-    }
-  }
-
-  /**
    * Evaluates if the information from an event (serviceId and versionNumber) matches to
    * the catalog object. If there is match, the in-flight version for that object is
    * removed and method returns true. If it does not match, returns false
@@ -830,20 +827,45 @@ public class CatalogServiceCatalog extends Catalog {
         "Event processing should be enabled when calling this method");
     long versionNumber = ctx.getVersionNumberFromEvent();
     String serviceIdFromEvent = ctx.getServiceIdFromEvent();
+    LOG.debug("Input arguments for self-event evaluation: {} {}",versionNumber,
+        serviceIdFromEvent);
     // no version info or service id in the event
-    if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) return false;
+    if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
+      LOG.info("Not a self-event since the given version is {} and service id is {}",
+          versionNumber, serviceIdFromEvent);
+      return false;
+    }
     // if the service id from event doesn't match with our service id this is not a
     // self-event
-    if (!getCatalogServiceId().equals(serviceIdFromEvent)) return false;
+    if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
+      LOG.info("Not a self-event because service id of this catalog {} does not match "
+          + "with one in event {}.", getCatalogServiceId(), serviceIdFromEvent);
+      return false;
+    }
     Db db = getDb(ctx.getDbName());
     if (db == null) {
       throw new DatabaseNotFoundException("Database " + ctx.getDbName() + " not found");
     }
     // if the given tblName is null we look db's in-flight events
     if (ctx.getTblName() == null) {
-      return db.removeFromVersionsForInflightEvents(versionNumber);
+      //TODO use read/write locks for both table and db
+      if (!tryLockDb(db)) {
+        throw new CatalogException("Could not acquire lock on database object " +
+            db.getName());
+      }
+      versionLock_.writeLock().unlock();
+      try {
+        boolean removed = db.removeFromVersionsForInflightEvents(versionNumber);
+        if (!removed) {
+          LOG.info("Could not find version {} in the in-flight event list of database "
+              + "{}", versionNumber, db.getName());
+        }
+        return removed;
+      } finally {
+        db.getLock().unlock();
+      }
     }
-    Table tbl = getTable(ctx.getDbName(), ctx.getTblName());
+    Table tbl = db.getTable(ctx.getTblName());
     if (tbl == null) {
       throw new TableNotFoundException(
           String.format("Table %s.%s not found", ctx.getDbName(), ctx.getTblName()));
@@ -859,7 +881,12 @@ public class CatalogServiceCatalog extends Catalog {
       List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues();
       // if the partitionKeyValues is null, we look for tbl's in-flight events
       if (partitionKeyValues == null) {
-        return tbl.removeFromVersionsForInflightEvents(versionNumber);
+        boolean removed = tbl.removeFromVersionsForInflightEvents(versionNumber);
+        if (!removed) {
+          LOG.info("Could not find version {} in in-flight event list of table {}",
+              versionNumber, tbl.getFullName());
+        }
+        return removed;
       }
       if (tbl instanceof HdfsTable) {
         List<String> failingPartitions = new ArrayList<>();
@@ -872,8 +899,11 @@ public class CatalogServiceCatalog extends Catalog {
             // should clean up the self-event state on the rest of the partitions
             String partName = HdfsTable.constructPartitionName(partitionKeyValue);
             if (hdfsPartition == null) {
-              LOG.warn(String.format("Partition %s not found during self-event "
-                + "evaluation for the table %s", partName, tbl.getFullName()));
+              LOG.info("Partition {} not found during self-event "
+                + "evaluation for the table {}", partName, tbl.getFullName());
+            } else {
+              LOG.info("Could not find {} in in-flight event list of the partition {} "
+                  + "of table {}", versionNumber, partName, tbl.getFullName());
             }
             failingPartitions.add(partName);
           }
@@ -895,11 +925,9 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public void addVersionsForInflightEvents(Table tbl, long versionNumber) {
     if (!isEventProcessingActive()) return;
-    // we generally don't take locks on Incomplete tables since they are atomically
-    // replaced during load
-    Preconditions.checkState(
-        tbl instanceof IncompleteTable || tbl.getLock().isHeldByCurrentThread());
     tbl.addToVersionsForInflightEvents(versionNumber);
+    LOG.info("Added catalog version {} in table's {} in-flight events",
+        versionNumber, tbl.getFullName());
   }
 
   /**
@@ -911,6 +939,8 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public void addVersionsForInflightEvents(Db db, long versionNumber) {
     if (!isEventProcessingActive()) return;
+    LOG.info("Added catalog version {} in database's {} in-flight events",
+        versionNumber, db.getName());
     db.addToVersionsForInflightEvents(versionNumber);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 4bfd070..330227c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -21,10 +21,10 @@ import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.impala.analysis.ColumnDef;
@@ -100,6 +100,10 @@ public class Db extends CatalogObjectImpl implements FeDb {
   // tracks the in-flight metastore events for this db
   private final InFlightEvents inFlightEvents_ = new InFlightEvents();
 
+  // lock to make sure modifications to the Db object are atomically done along with
+  // its associated HMS operation (eg. alterDbOwner or commentOnDb)
+  private final ReentrantLock dbLock_ = new ReentrantLock();
+
   public Db(String name, org.apache.hadoop.hive.metastore.api.Database msDb) {
     setMetastoreDb(name, msDb);
     tableCache_ = new CatalogObjectCache<>();
@@ -139,6 +143,8 @@ public class Db extends CatalogObjectImpl implements FeDb {
     return msDb.getParameters().remove(k) != null;
   }
 
+  public ReentrantLock getLock() { return dbLock_; }
+
   @Override // FeDb
   public boolean isSystemDb() { return isSystemDb_; }
   @Override // FeDb
@@ -497,18 +503,14 @@ public class Db extends CatalogObjectImpl implements FeDb {
   }
 
   /**
-   * Gets the current list of versions for in-flight events for this database
-   */
-  public List<Long> getVersionsForInflightEvents() {
-    return inFlightEvents_.getAll();
-  }
-
-  /**
    * Removes a given version from the collection of version numbers for in-flight events
    * @param versionNumber version number to remove from the collection
    * @return true if version was successfully removed, false if didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
+        "removeFromVersionsForInflightEvents called without getting the db lock for "
+            + getName() + " database.");
     return inFlightEvents_.remove(versionNumber);
   }
 
@@ -520,6 +522,9 @@ public class Db extends CatalogObjectImpl implements FeDb {
    * @param versionNumber version number to add
    */
   public void addToVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
+        "addToVersionsForInFlightEvents called without getting the db lock for "
+            + getName() + " database.");
     if (!inFlightEvents_.add(versionNumber)) {
       LOG.warn(String.format("Could not add version %s to the list of in-flight "
           + "events. This could cause unnecessary database %s invalidation when the "
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index d0664df..45a0a83 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -851,18 +851,14 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
   }
 
   /**
-   * Gets the current list of versions for in-flight events for this partition
-   */
-  public List<Long> getVersionsForInflightEvents() {
-    return inFlightEvents_.getAll();
-  }
-
-  /**
    * Removes a given version from the in-flight events
    * @param versionNumber version number to remove
    * @return true if the versionNumber was removed, false if it didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+        "removeFromVersionsForInflightEvents called without holding the table lock on "
+            + "partition " + getPartitionName() + " of table " + table_.getFullName());
     return inFlightEvents_.remove(versionNumber);
   }
 
@@ -871,6 +867,9 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
    * @param versionNumber version number to add
    */
   public void addToVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
+        "addToVersionsForInflightEvents called without holding the table lock on "
+            + "partition " + getPartitionName() + " of table " + table_.getFullName());
     if (!inFlightEvents_.add(versionNumber)) {
       LOG.warn(String.format("Could not add %s version to the partition %s of table %s. "
           + "This could cause unnecessary refresh of the partition when the event is"
@@ -881,17 +880,22 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
 
   /**
    * Adds the version from the given Partition parameters. No-op if the parameters does
-   * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>
+   * not contain the <code>MetastoreEventPropertyKey.CATALOG_VERSION</code>. This is
+   * done to detect add partition events from this catalog which are generated when
+   * partitions are added or recovered.
    */
   private void addInflightVersionsFromParameters() {
     Preconditions.checkNotNull(hmsParameters_);
     Preconditions.checkState(inFlightEvents_.size() == 0);
+    // we should not check for table lock being held here since there are certain code
+    // paths which call this method without holding the table lock (eg. getOrLoadTable())
     if (!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
       return;
     }
     inFlightEvents_.add(Long.parseLong(
             hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
   }
+
   /**
    * Marks this partition's metadata as "dirty" indicating that changes have been
    * made and this partition's metadata should not be reused during the next
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 6107a12..db9fe3e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -794,18 +794,14 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   }
 
   /**
-   * Gets the current list of versions for in-flight events for this table
-   */
-  public List<Long> getVersionsForInflightEvents() {
-    return inFlightEvents.getAll();
-  }
-
-  /**
    * Removes a given version from the collection of version numbers for in-flight events
    * @param versionNumber version number to remove from the collection
    * @return true if version was successfully removed, false if didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+    Preconditions.checkState(tableLock_.isHeldByCurrentThread(),
+        "removeFromVersionsForInFlightEvents called without taking the table lock on "
+            + getFullName());
     return inFlightEvents.remove(versionNumber);
   }
 
@@ -819,6 +815,10 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
    * capacity
    */
   public void addToVersionsForInflightEvents(long versionNumber) {
+    // we generally don't take locks on Incomplete tables since they are atomically
+    // replaced during load
+    Preconditions.checkState(
+        this instanceof IncompleteTable || tableLock_.isHeldByCurrentThread());
     if (!inFlightEvents.add(versionNumber)) {
       LOG.warn(String.format("Could not add %s version to the table %s. This could "
           + "cause unnecessary refresh of the table when the event is received by the "
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 92d8820..098f2e9 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -216,21 +216,27 @@ import org.slf4j.LoggerFactory;
  * updates, DDL operations should not directly modify the HMS objects of the catalog
  * objects but should operate on copies instead.
  *
- * The CatalogOpExecutor uses table-level locking to protect table metadata during
- * concurrent modifications and is responsible for assigning a new catalog version when
- * a table is modified (e.g. alterTable()).
+ * The CatalogOpExecutor uses table-level or Db object level locking to protect table
+ * metadata or database metadata respectively during concurrent modifications and is
+ * responsible for assigning a new catalog version when a table/Db is modified
+ * (e.g. alterTable() or alterDb()).
  *
  * The following locking protocol is employed to ensure that modifying
- * the table metadata and assigning a new catalog version is performed atomically and
+ * the table/Db metadata and assigning a new catalog version is performed atomically and
  * consistently in the presence of concurrent DDL operations. The following pattern
  * ensures that the catalog lock is never held for a long period of time, preventing
- * other DDL operations from making progress. This pattern only applies to single-table
+ * other DDL operations from making progress. This pattern only applies to single-table/Db
  * update operations and requires the use of fair table locks to prevent starvation.
+ * Additionally, this locking protocol is also followed in case of CREATE/DROP
+ * FUNCTION. In case of CREATE/DROP FUNCTION, we take the Db object lock since
+ * certain FUNCTION are stored in the HMS database parameters. Using this approach
+ * also makes sure that adding or removing functions from different databases do not
+ * block each other.
  *
  *   DO {
  *     Acquire the catalog lock (see CatalogServiceCatalog.versionLock_)
- *     Try to acquire a table lock
- *     IF the table lock acquisition fails {
+ *     Try to acquire a table/Db lock
+ *     IF the table/Db lock acquisition fails {
  *       Release the catalog lock
  *       YIELD()
  *     ELSE
@@ -241,15 +247,15 @@ import org.slf4j.LoggerFactory;
  *
  *   Increment and get a new catalog version
  *   Release the catalog lock
- *   Modify table metadata
- *   Release table lock
+ *   Modify table/Db metadata
+ *   Release table/Db lock
  *
  * Note: The getCatalogObjects() function is the only case where this locking pattern is
  * not used since it accesses multiple catalog entities in order to compute a snapshot
  * of catalog metadata.
  *
- * Operations that CREATE/DROP catalog objects such as tables and databases employ the
- * following locking protocol:
+ * Operations that CREATE/DROP catalog objects such as tables and databases
+ * (except for functions, see above) employ the following locking protocol:
  * 1. Acquire the metastoreDdlLock_
  * 2. Update the Hive Metastore
  * 3. Increment and get a new catalog version
@@ -257,6 +263,7 @@ import org.slf4j.LoggerFactory;
  * 5. Grant/revoke owner privilege if authorization with ownership is enabled.
  * 6. Release the metastoreDdlLock_
  *
+ *
  * It is imperative that other operations that need to hold both the catalog lock and
  * table locks at the same time follow the same locking protocol and acquire these
  * locks in that particular order. Also, operations that modify table metadata
@@ -1356,21 +1363,23 @@ public class CatalogOpExecutor {
     }
     boolean isPersistentJavaFn =
         (fn.getBinaryType() == TFunctionBinaryType.JAVA) && fn.isPersistent();
-    synchronized (metastoreDdlLock_) {
-      Db db = catalog_.getDb(fn.dbName());
-      if (db == null) {
-        throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
-      }
-      // Get a new catalog version to assign to the database being altered. This is
-      // needed for events processor as this method creates alter database events.
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+    Db db = catalog_.getDb(fn.dbName());
+    if (db == null) {
+      throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
+    }
+
+    tryLock(db, "creating function " + fn.getClass().getSimpleName());
+    // Get a new catalog version to assign to the database being altered. This is
+    // needed for events processor as this method creates alter database events.
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    try {
       // Search for existing functions with the same name or signature that would
       // conflict with the function being added.
-      for (Function function: db.getFunctions(fn.functionName())) {
+      for (Function function : db.getFunctions(fn.functionName())) {
         if (isPersistentJavaFn || (function.isPersistent() &&
             (function.getBinaryType() == TFunctionBinaryType.JAVA)) ||
-                function.compare(fn, Function.CompareMode.IS_INDISTINGUISHABLE)) {
+            function.compare(fn, Function.CompareMode.IS_INDISTINGUISHABLE)) {
           if (!params.if_not_exists) {
             throw new CatalogException("Function " + fn.functionName() +
                 " already exists.");
@@ -1386,15 +1395,16 @@ public class CatalogOpExecutor {
         // the corresponding Jar and add each signature to the catalog.
         Preconditions.checkState(fn instanceof ScalarFunction);
         org.apache.hadoop.hive.metastore.api.Function hiveFn =
-            ((ScalarFunction)fn).toHiveFunction();
+            ((ScalarFunction) fn).toHiveFunction();
         List<Function> funcs = FunctionUtils.extractFunctions(fn.dbName(), hiveFn,
             BackendConfig.INSTANCE.getBackendCfg().local_library_path);
         if (funcs.isEmpty()) {
           throw new CatalogException(
-            "No compatible function signatures found in class: " + hiveFn.getClassName());
+              "No compatible function signatures found in class: " + hiveFn
+                  .getClassName());
         }
         if (addJavaFunctionToHms(fn.dbName(), hiveFn, params.if_not_exists)) {
-          for (Function addedFn: funcs) {
+          for (Function addedFn : funcs) {
             if (LOG.isTraceEnabled()) {
               LOG.trace(String.format("Adding function: %s.%s", addedFn.dbName(),
                   addedFn.signatureString()));
@@ -1404,7 +1414,14 @@ public class CatalogOpExecutor {
           }
         }
       } else {
+        //TODO(Vihang): addFunction method below directly updates the database
+        // parameters. If the applyAlterDatabase method below throws an exception,
+        // catalog might end up in a inconsistent state. Ideally, we should make a copy
+        // of hms Database object and then update the Db once the HMS operation succeeds
+        // similar to what happens in alterDatabaseSetOwner method.
         if (catalog_.addFunction(fn)) {
+          addCatalogServiceIdentifiers(db.getMetaStoreDb(),
+              catalog_.getCatalogServiceId(), newCatalogVersion);
           // Flush DB changes to metastore
           applyAlterDatabase(db.getMetaStoreDb());
           addedFunctions.add(fn.toTCatalogObject());
@@ -1421,6 +1438,8 @@ public class CatalogOpExecutor {
       } else {
         addSummary(resp, "Function already exists.");
       }
+    } finally {
+      db.getLock().unlock();
     }
   }
 
@@ -2098,24 +2117,26 @@ public class CatalogOpExecutor {
   private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp)
       throws ImpalaException {
     FunctionName fName = FunctionName.fromThrift(params.fn_name);
-    synchronized (metastoreDdlLock_) {
-      Db db = catalog_.getDb(fName.getDb());
-      if (db == null) {
-        if (!params.if_exists) {
-            throw new CatalogException("Database: " + fName.getDb()
-                + " does not exist.");
-        }
-        addSummary(resp, "Database does not exist.");
-        return;
+    Db db = catalog_.getDb(fName.getDb());
+    if (db == null) {
+      if (!params.if_exists) {
+        throw new CatalogException("Database: " + fName.getDb()
+            + " does not exist.");
       }
-      // Get a new catalog version to assign to the database being altered. This is
-      // needed for events processor as this method creates alter database events.
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+      addSummary(resp, "Database does not exist.");
+      return;
+    }
+
+    tryLock(db, "dropping function " + fName);
+    // Get a new catalog version to assign to the database being altered. This is
+    // needed for events processor as this method creates alter database events.
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    try {
       List<TCatalogObject> removedFunctions = Lists.newArrayList();
       if (!params.isSetSignature()) {
         dropJavaFunctionFromHms(fName.getDb(), fName.getFunction(), params.if_exists);
-        for (Function fn: db.getFunctions(fName.getFunction())) {
+        for (Function fn : db.getFunctions(fName.getFunction())) {
           if (fn.getBinaryType() != TFunctionBinaryType.JAVA
               || !fn.isPersistent()) {
             continue;
@@ -2125,7 +2146,7 @@ public class CatalogOpExecutor {
         }
       } else {
         ArrayList<Type> argTypes = Lists.newArrayList();
-        for (TColumnType t: params.arg_types) {
+        for (TColumnType t : params.arg_types) {
           argTypes.add(Type.fromThrift(t));
         }
         Function desc = new Function(fName, argTypes, Type.INVALID, false);
@@ -2136,6 +2157,8 @@ public class CatalogOpExecutor {
                 "Function: " + desc.signatureString() + " does not exist.");
           }
         } else {
+          addCatalogServiceIdentifiers(db.getMetaStoreDb(),
+              catalog_.getCatalogServiceId(), newCatalogVersion);
           // Flush DB changes to metastore
           applyAlterDatabase(db.getMetaStoreDb());
           removedFunctions.add(fn.toTCatalogObject());
@@ -2152,6 +2175,8 @@ public class CatalogOpExecutor {
         addSummary(resp, "Function does not exist.");
       }
       resp.result.setVersion(catalog_.getCatalogVersion());
+    } finally {
+      db.getLock().unlock();
     }
   }
 
@@ -4576,16 +4601,19 @@ public class CatalogOpExecutor {
   }
 
   private void alterCommentOnDb(String dbName, String comment, TDdlExecResponse response)
-      throws ImpalaRuntimeException, CatalogException {
+      throws ImpalaRuntimeException, CatalogException, InternalException {
     Db db = catalog_.getDb(dbName);
     if (db == null) {
       throw new CatalogException("Database: " + dbName + " does not exist.");
     }
-    synchronized (metastoreDdlLock_) {
-      // Get a new catalog version to assign to the database being altered.
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+    tryLock(db, "altering the comment");
+    // Get a new catalog version to assign to the database being altered.
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    try {
       Database msDb = db.getMetaStoreDb().deepCopy();
+      addCatalogServiceIdentifiers(msDb, catalog_.getCatalogServiceId(),
+          newCatalogVersion);
       msDb.setDescription(comment);
       try {
         applyAlterDatabase(msDb);
@@ -4597,6 +4625,8 @@ public class CatalogOpExecutor {
       // now that HMS alter operation has succeeded, add this version to list of inflight
       // events in catalog database if event processing is enabled
       catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
+    } finally {
+      db.getLock().unlock();
     }
     addSummary(response, "Updated database.");
   }
@@ -4623,11 +4653,14 @@ public class CatalogOpExecutor {
       TDdlExecResponse response) throws ImpalaException {
     Preconditions.checkNotNull(params.owner_name);
     Preconditions.checkNotNull(params.owner_type);
-    synchronized (metastoreDdlLock_) {
-      // Get a new catalog version to assign to the database being altered.
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
+    tryLock(db, "altering the owner");
+    // Get a new catalog version to assign to the database being altered.
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    try {
       Database msDb = db.getMetaStoreDb().deepCopy();
+      addCatalogServiceIdentifiers(msDb, catalog_.getCatalogServiceId(),
+          newCatalogVersion);
       String originalOwnerName = msDb.getOwnerName();
       PrincipalType originalOwnerType = msDb.getOwnerType();
       msDb.setOwnerName(params.owner_name);
@@ -4647,6 +4680,8 @@ public class CatalogOpExecutor {
       // now that HMS alter operation has succeeded, add this version to list of inflight
       // events in catalog database if event processing is enabled
       catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
+    } finally {
+      db.getLock().unlock();
     }
     addSummary(response, "Updated database.");
   }
@@ -4656,9 +4691,9 @@ public class CatalogOpExecutor {
    * No-op if event processing is disabled
    */
   private void addCatalogServiceIdentifiers(
-      Db db, String catalogServiceId, long newCatalogVersion) {
+      Database msDb, String catalogServiceId, long newCatalogVersion) {
     if (!catalog_.isEventProcessingActive()) return;
-    org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
+    Preconditions.checkNotNull(msDb);
     msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
         catalogServiceId);
     msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
@@ -4775,6 +4810,17 @@ public class CatalogOpExecutor {
   }
 
   /**
+   * Try to lock the given Db in the catalog for the given operation. Throws
+   * InternalException if catalog is unable to lock the database.
+   */
+  private void tryLock(Db db, String operation) throws InternalException {
+    if (!catalog_.tryLockDb(db)) {
+      throw new InternalException(String.format("Error %s of database %s due to lock "
+          + "contention.", operation, db.getName()));
+    }
+  }
+
+  /**
    * Commits ACID transaction with given transaction id.
    * @param transactionId is the id of the transaction.
    * @throws TransactionException
diff --git a/tests/custom_cluster/test_concurrent_ddls.py b/tests/custom_cluster/test_concurrent_ddls.py
index 37a498a..695045b 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -97,22 +97,32 @@ class TestConcurrentDdls(CustomClusterTestSuite):
 
     def run_ddls(i):
       tbl_name = db + ".test_" + str(i)
-      for query_tmpl in [
+      # func_name = "f_" + str(i)
+      for query in [
+        # alter database operations
+        # TODO (IMPALA-9532): Uncomment the alter database operations
+        # "comment on database %s is 'test-concurrent-ddls'" % db,
+        # "alter database %s set owner user `test-user`" % db,
+        # "create function %s.%s() returns int location '%s/libTestUdfs.so' \
+        #    symbol='NoArgs'" % (db, func_name, WAREHOUSE),
+        # "drop function if exists %s.%s()" % (db, func_name),
         # Create a partitioned and unpartitioned table
-        "create table %s (i int)",
-        "create table %s_part (i int) partitioned by (j int)",
+        "create table %s (i int)" % tbl_name,
+        "create table %s_part (i int) partitioned by (j int)" % tbl_name,
         # Below queries could fail if running with invalidate metadata concurrently
-        "alter table %s_part add partition (j=1)",
-        "alter table %s_part add partition (j=2)",
-        "invalidate metadata %s_part",
-        "refresh %s",
-        "refresh %s_part",
-        "insert overwrite table %s select int_col from functional.alltypestiny",
-        "insert overwrite table %s_part partition(j=1) values (1), (2), (3), (4), (5)",
-        "insert overwrite table %s_part partition(j=2) values (1), (2), (3), (4), (5)"
+        "alter table %s_part add partition (j=1)" % tbl_name,
+        "alter table %s_part add partition (j=2)" % tbl_name,
+        "invalidate metadata %s_part" % tbl_name,
+        "refresh %s" % tbl_name,
+        "refresh %s_part" % tbl_name,
+        "insert overwrite table %s select int_col from "
+        "functional.alltypestiny" % tbl_name,
+        "insert overwrite table %s_part partition(j=1) "
+        "values (1), (2), (3), (4), (5)" % tbl_name,
+        "insert overwrite table %s_part partition(j=2) "
+        "values (1), (2), (3), (4), (5)" % tbl_name
       ]:
         try:
-          query = query_tmpl % tbl_name
           # TODO(IMPALA-9123): Timeout logic here does not work for DDLs since they are
           #  usually stuck in CREATED state and execute_async() won't return. We finally
           #  use timeout in pytest.mark.timeout() but it's not precise. We should find a
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index e9d5c4f..f39f55f 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -18,16 +18,13 @@
 import random
 import string
 import pytest
-import json
-import time
-import requests
 
-from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
-    SkipIfLocal, SkipIfHive2
+from tests.common.skip import SkipIfHive2
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.util.hive_utils import HiveDbWrapper
 from tests.util.event_processor_utils import EventProcessorUtils
+from tests.util.filesystem_utils import WAREHOUSE
 
 
 @SkipIfS3.hive
@@ -242,8 +239,12 @@ class TestEventProcessing(CustomClusterTestSuite):
     self_event_test_queries = {
       # Queries which will increment the self-events-skipped counter
       True: [
+          # ALTER_DATABASE case
           "comment on database {0} is 'self-event test database'".format(db_name),
           "alter database {0} set owner user `test-user`".format(db_name),
+          "create function {0}.f() returns int location '{1}/libTestUdfs.so' "
+          "symbol='NoArgs'".format(db_name, WAREHOUSE),
+          "drop function {0}.f()".format(db_name),
           # ALTER_TABLE case
           "alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name),
           "alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name),